package com.pms.utils;

/**
 * Created by Administrator on 2018/1/9.
 */

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * kafa 消费者工具类
 */
public class KafkaUtil {
    private static final int TIME_OUT_MS = 5000;

    public static int getTimeOutMs() {
        return TIME_OUT_MS;
    }

    public static Properties getProperties() {
        Properties properties = new Properties();
        try {
            properties.load(KafkaUtil.class.getClassLoader().getResourceAsStream("client.properties"));
            String rootPath = KafkaUtil.class.getResource("/").getPath();
            properties.put("ssl.truststore.location", rootPath + properties.get("ssl.truststore.location"));
            properties.put("ssl.keystore.location", rootPath + properties.get("ssl.keystore.location"));
        } catch (IOException ex) {
            ex.printStackTrace();
        }
        return properties;
    }

    public static KafkaProducer<String, String> getProducerInstance() {
        Properties properties = getProperties();
        properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "kafka.bj.baidubce.com:9091");
        // 用于初始化时建立链接到kafka集群，以host:port形式，多个以逗号分隔host1:port1,host2:port2；
//            properties.setProperty("bootstrap.servers", "kafka.bj.baidubce.com:9091");

        properties.setProperty(CommonClientConfigs.CLIENT_ID_CONFIG, "kafka-samples-java-producer-5b6c2f0b08c94ca3a6e94c7fe884dd41");
//            properties.setProperty("client.id", "kafka-samples-java-producer-5b6c2f0b08c94ca3a6e94c7fe884dd41");

        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //序列化方式将用户提供的key和vaule值的序列化方式 --------序列化成字节。
//            properties.setProperty("key.serializer", StringSerializer.class.getName());
//            properties.setProperty("value.serializer", StringSerializer.class.getName());

        properties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
        //生产者需要server端在接收到消息后，进行反馈确认的尺度，主要用于消息的可靠性传输；
        // acks=0表示生产者不需要来自server的确认；
        // acks=1表示server端将消息保存后即可发送ack，而不必等到其他follower(从-跟随)角色的都收到了该消息；
        // acks=all(or acks=-1)意味着server端将等待所有的副本都被接收后才发送确认
//            properties.setProperty("acks", "1");

        // 生产者发送失败后，重试的次数 ,当前设置为0
//            properties.put("retries", 0);
        return new KafkaProducer<String, String>(properties);
    }

    /**
     * 发送消息
     *
     * @param topic
     * @param key
     * @param value
     */
    public static void send(String topic, String key, String value) {
        KafkaProducer<String, String> producer   = getProducerInstance();
        try {
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value);
            producer.send(record);
        } finally {
            producer.close();
        }
    }

    /**
     * 发送消息
     *
     * @param topic
     * @param messageMap
     */
    public static void send(String topic, Map<String, String> messageMap) {
        KafkaProducer<String, String> producer = getProducerInstance();
        if (messageMap != null && !messageMap.isEmpty()) {
            try {
                for (Map.Entry<String, String> entry : messageMap.entrySet()) {
                    ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, entry.getKey(), entry.getValue());
                    producer.send(record);
                }
            } finally {
                producer.close();
            }
        }
    }

    public static KafkaConsumer<String, String> getConsumerInstance() {
            Properties properties = getProperties();
            if ("your_keystore_password".equals(properties.getProperty("ssl.keystore.password"))) {
                return null;
            }
            properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "kafka.bj.baidubce.com:9091");
            // 用于初始化时建立链接到kafka集群，以host:port形式，多个以逗号分隔host1:port1,host2:port2；
            // properties.setProperty("bootstrap.servers", "kafka.bj.baidubce.com:9091");

            properties.setProperty(CommonClientConfigs.CLIENT_ID_CONFIG, "kafka-samples-java-consumer-5b6c2f0b08c94ca3a6e94c7fe884dd41");
//                properties.setProperty("client.id", "kafka-samples-java-consumer-5b6c2f0b08c94ca3a6e94c7fe884dd41");

            //消费者的组id
            properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kafka-samples-java-group-5b6c2f0b08c94ca3a6e94c7fe884dd41");
//                properties.setProperty("group.id", "kafka-samples-java-group-5b6c2f0b08c94ca3a6e94c7fe884dd41");

            properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//                properties.setProperty("key.deserializer", StringDeserializer.class.getName());
//                properties.setProperty("value.deserializer", StringDeserializer.class.getName());

            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//                properties.put("auto.offset.reset", "earliest");
        return new KafkaConsumer<String, String>(properties);
    }
    /**
     * 订阅消息 - 订阅1次
     *
     * @param topicList 订阅的主题数组
     */
    public static ConsumerRecords<String, String>  subscribe(List<String> topicList){
        return subscribe(null,topicList);
    }
    /**
     * 订阅消息 - 订阅1次
     *
     * @param topicList 订阅的主题数组
     */
    public static ConsumerRecords<String, String> subscribe(KafkaConsumer<String, String> consumerKafka, List<String> topicList) {
        try {
            if (consumerKafka == null) {
                consumerKafka = getConsumerInstance();
            }
            if (topicList == null || topicList.isEmpty()) {
                return new ConsumerRecords<String, String>(Collections.EMPTY_MAP);
            }
            consumerKafka.subscribe(topicList);
            return consumerKafka.poll(TIME_OUT_MS);
        } finally {
            consumerKafka.close();
        }
    }
    /**
     * 订阅消息-订阅1次
     *
     * @param topic 订阅的主题
     */
    public static ConsumerRecords<String, String> subscribe(String topic) {
        return subscribe(null,topic);
    }
    /**
     * 订阅消息-订阅1次
     *
     * @param topic 订阅的主题
     */
    public static ConsumerRecords<String, String> subscribe(KafkaConsumer<String, String> consumerKafka, String topic) {
        try {
            if (consumerKafka == null) {
                consumerKafka = getConsumerInstance();
            }
            if (StringUtils.isBlank(topic)) {
                return new ConsumerRecords<String, String>(Collections.EMPTY_MAP);
            }
            consumerKafka.subscribe(Collections.singletonList(topic));
            return consumerKafka.poll(TIME_OUT_MS);
        } finally {
            consumerKafka.close();
        }
    }

    /**
     * 关闭
     * @param consumerKafka
     */
    public static void closeConsumer(KafkaConsumer<String, String> consumerKafka){
        if(consumerKafka!=null){
            consumerKafka.close();
        }
    }
    /**
     * 关闭
     * @param producer
     */
    public static void closeConsumer(KafkaProducer<String, String> producer){
        if(producer!=null){
            producer.close();
        }
    }


}
